package com.daoshu.mti.kafkaclient.handle;

import com.alibaba.fastjson.JSON;
import com.daoshu.mti.kafkaclient.bean.TKmJurisdictionEntity;
import com.daoshu.mti.kafkaclient.bean.TKmWbEntity;
import com.daoshu.mti.kafkaclient.bean.TKmWbswEntity;
import com.daoshu.mti.kafkaclient.bean.TKmZdryYjEntity;
import com.daoshu.mti.kafkaclient.constant.Topic;
import com.daoshu.mti.kafkaclient.consumer.TKmWbswBaseConsumer;
import com.daoshu.mti.kafkaclient.service.TKmJurisdictionService;
import com.daoshu.mti.kafkaclient.service.TKmWbService;
import com.daoshu.mti.kafkaclient.service.TKmWbswService;
import com.daoshu.mti.kafkaclient.service.TKmZdryYjService;
import com.daoshu.mti.kafkaclient.utils.AddressOrLngLatConvertUtil;
import com.daoshu.mti.kafkaclient.utils.DateUtils;
import com.daoshu.mti.kafkaclient.utils.ZdryYjUtilBean;
import com.daoshu.mti.kafkaclient.websocket.SocketType;
import com.daoshu.mti.kafkaclient.websocket.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Classname TKmTldpHandle
 * @Description TODO
 * @Date 2020/1/17 21:15
 * @Created by duchaof
 * @Version 1.0
 */
@Slf4j
@Component
public class TKmWbswHandle extends TKmWbswBaseConsumer {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    @Autowired
    private TKmWbswService tKmWbswService;

    @Autowired
    private TKmZdryYjService tKmZdryYjService;

    @Autowired
    private WebSocketUtils webSocketUtils;

    @Autowired
    private AddressOrLngLatConvertUtil addressOrLngLatConvertUtil;

    @Autowired
    private TKmJurisdictionService tKmJurisdictionService;

    @Autowired
    private TKmWbService tKmWbService;

    private static Lock lock = new ReentrantLock();

    @Override
    @Transactional(rollbackFor = Exception.class)
    protected void onDealMessage(String message) {
        try {
            log.info("接收到topic：" + Topic.WBSW + "的消息" + ",线程：" + Thread.currentThread().getName() + "在" + sdf.format(new Date()) + "开始处理接受到的消息：" + message);
            TKmWbswEntity tKmWbswEntity = JSON.parseObject(message, TKmWbswEntity.class);
            if (null != tKmWbswEntity.getLoginAt() && tKmWbswEntity.getLoginAt().length() > 0) {
                tKmWbswEntity.setLoginTm(DateUtils.parseStrToDate(tKmWbswEntity.getLoginAt(), "yyyy/MM/dd HH:mm:ss"));
            }
            Integer res = null;
            lock.lock();
            try {
                // 根据身份证号 和 预警时间判断重复数据
                Boolean repeatFlag = tKmWbswService.judgeRepeatData(tKmWbswEntity);
                if (repeatFlag) return;

                // 是否具有网吧名称 和 经纬度
                if (null == tKmWbswEntity.getWbmc() || null == tKmWbswEntity.getWbjd() || null == tKmWbswEntity.getWbwd() ||
                        tKmWbswEntity.getWbmc().length() <= 0 || tKmWbswEntity.getWbjd().length() <= 0 || tKmWbswEntity.getWbwd().length() <= 0) {
                    if (null != tKmWbswEntity.getServiceCode() && tKmWbswEntity.getServiceCode().length() > 0) {
                        TKmWbEntity wb = tKmWbService.getWbXxByCode(tKmWbswEntity.getServiceCode());
                        if (null != wb) {
                            if (null != wb.getName() && wb.getName().length() > 0) tKmWbswEntity.setWbmc(wb.getName());
                            if (null != wb.getLng() && wb.getLng().length() > 0) tKmWbswEntity.setWbjd(wb.getLng());
                            if (null != wb.getLat() && wb.getLat().length() > 0) tKmWbswEntity.setWbwd(wb.getLat());
                        }
                    }
                }

                res = tKmWbswService.insert(tKmWbswEntity);
            } catch (Exception e) {
                log.error("插入网吧预警数据时发生异常：==========>{}", e.getMessage());
            } finally {
                lock.unlock();
            }
            if (res > 0) {
                //log.info("数据插入成功，插入成功的数据为：" + message);
                TKmZdryYjEntity tKmZdryYjEntity = tKmWbswEntity.transtoZdryEntity();

                //网吧设定 当前辖区
                tKmZdryYjEntity.setCurrentJur(addressOrLngLatConvertUtil.checkAddress(tKmWbswEntity.getWbjd(), tKmWbswEntity.getWbwd()));

                // 查询预警发生地的派出所代码和名称
                TKmJurisdictionEntity tKmJurisdictionEntity = tKmJurisdictionService.queryYjdPcsByLngAndLat(tKmZdryYjEntity.getX(), tKmZdryYjEntity.getY());
                if (null != tKmJurisdictionEntity) {
                    tKmZdryYjEntity.setYjdPcs(tKmJurisdictionEntity.getMc());
                    tKmZdryYjEntity.setYjdPcsCode(String.format("%.0f", Double.parseDouble(tKmJurisdictionEntity.getDm())));
                    tKmZdryYjEntity.setYjdSsfj(tKmJurisdictionEntity.getSsfj());
                }

                tKmZdryYjService.insert(tKmZdryYjEntity);
                ZdryYjUtilBean zdryYjUtilBean = tKmWbswEntity.transtoZdryYjUtilBean();
                zdryYjUtilBean.setYjwybs(tKmZdryYjEntity.getWybs());

                if (null != tKmJurisdictionEntity) {
                    StringBuilder sb = new StringBuilder("重点人员【");
                    sb.append(tKmZdryYjEntity.getRyxm()).append("】在").append(tKmJurisdictionEntity.getMc()).append("的管辖范围内的")
                            .append(tKmZdryYjEntity.getYjdd()).append("上网");
                    zdryYjUtilBean.setYjMsg(sb.toString());
                }

                if (null != tKmWbswEntity.getIdCode()) {
                    //查询此条预警是否是灰名单 或者是 已稳控  如果是就不推送
                    TKmZdryYjEntity entity = tKmZdryYjService.authGkjbAndWkzt(tKmWbswEntity.getIdCode());
                    if (!ObjectUtils.isEmpty(entity)) {
                        return;
                    }
                    List<String> tokens = tKmZdryYjService.findWebsocketPsuhUserListBySfzh(tKmWbswEntity.getIdCode());
                    if (null != tokens) {
                        webSocketUtils.sendWebsockMessageToUser(tokens,
                                Arrays.asList("wbsw", JSON.toJSONString(zdryYjUtilBean)), SocketType.ZDRRYJXX.getCode());

                    }
                }
            } else {
                log.error("数据插入失败，插入失败的数据为：" + message);
            }
        } catch (Exception e) {
            log.error("插入数据库发生异常=====>{}", e.getMessage());
        }
    }
}
